/**
 * @file 
 *
 * Distributed client benchmark - starter
 *
 * @note must start after all followers are ready
 */

#include <filesystem>
#include <vector>
#include <unordered_map>
#include <map>
#include <random>
#include <fstream>
#include <sstream>
#include <memory>
#include <thread>
#include <atomic>
#include <chrono>
using namespace std::chrono_literals;

#include "common/boost_log_helper.hpp"
#include <boost/program_options.hpp>

#include "defaults.hpp"
#include "client.hpp"
#include "ycsb_parser.hpp"

using namespace std;


int main(const int argc, const char **argv)
{
    const filesystem::path src_dir = 
        filesystem::absolute(argv[0]).parent_path().parent_path().parent_path();
    filesystem::path config_path;
    filesystem::path ycsb_load_path;
    string log_level;
    unsigned client_id;
    unsigned bench_duration_sec;
    {
        namespace po = boost::program_options;
        po::options_description desc;
        desc.add_options()
            ("config,C", po::value(&config_path),
                "Configuration file, if not given, will search for "
                "/etc/gestalt/gestalt.conf, ./gestalt.conf, "
                "./etc/gestalt/gestalt.conf, whichever comes first.")
            ("log,L", po::value(&log_level)->default_value("info"),
                "Logging level (Boost).")
            ("id,i", po::value(&client_id)->default_value(114514), "Specify client ID.")
            ("duration,D", po::value(&bench_duration_sec)->default_value(10),
                "Benchmark duration in seconds.")
            ("ycsb-load", po::value(&ycsb_load_path)
                    ->default_value(src_dir/"build"/"runtime"/"load.ycsb"),
                "YCSB load output")
            ;
        po::variables_map vm;
        po::store(po::parse_command_line(argc, argv, desc), vm);
        po::notify(vm);
    }

    set_boost_log_level(log_level);

    if (config_path.empty()) {
        for (auto &p : gestalt::defaults::config_paths) {
            if (!filesystem::is_regular_file(p))
                continue;
            config_path = p;
            break;
        }
    }
    if (!filesystem::is_regular_file(config_path)) {
        BOOST_LOG_TRIVIAL(fatal) << "Cannot find configuration file";
        exit(EXIT_FAILURE);
    }

    /* load working set to kv */

    smdsbz::ycsb_parser::trace ycsb_load;
    {
        namespace yp = smdsbz::ycsb_parser;
        ycsb_load.reserve(1e4);
        yp::parse(ycsb_load_path, ycsb_load);
    }
    BOOST_LOG_TRIVIAL(info) << "YCSB workload loaded";

    gestalt::Client coord_client(config_path, client_id);
    /* put sentinel placeholder, (0, 0) marks load not finished yet */
    {
        const int64_t zero_val = 0;
        if (int r = coord_client.put("start_at", &zero_val, sizeof(zero_val)); r) {
            errno = -r;
            boost_log_errno_throw(coord_client.put start_at);
        }
        if (int r = coord_client.put("end_at", &zero_val, sizeof(zero_val)); r) {
            errno = -r;
            boost_log_errno_throw(coord_client.put end_at);
        }
    }

    /* insert working set */
    /** @note insert collisions will be ignored */
    size_t successful_insertions = 0;
    {
        BOOST_LOG_TRIVIAL(info) << "Loading workload into Gestalt ...";
        for (const auto &d : ycsb_load) {
            uint8_t buf[4_K];
            /* HACK: we don't fill with actual data!
                but it does not affect performance, so it is actually okay not
                to bother, just do something and yisi-yisi :`) */
            std::strcpy(reinterpret_cast<char*>(buf), d.okey.c_str());
            // BOOST_LOG_TRIVIAL(trace) << "putting " << d.okey;
            int r = coord_client.put(d.okey.c_str(), buf, sizeof(buf));
            if (!r) {
                successful_insertions++;
                continue;
            }
            if (r == -EDQUOT) {
                BOOST_LOG_TRIVIAL(trace) << "failed inserting key " << d.okey
                    << ", ignored";
                continue;
            }
            errno = -r;
            boost_log_errno_throw(Client::put);
        }
        BOOST_LOG_TRIVIAL(info) << "Finished loading workload, loaded "
            << successful_insertions << " / " << ycsb_load.size()
            << " (" << 100. * successful_insertions / ycsb_load.size() << "%)";
    }

    /* Get! Set! Put sentinel broadcasting starting time. */

    /* wait a bit for all followers to poll timestamps */
    const auto start_at = std::chrono::system_clock::now() + 2s;
    /* run for fixed duration, calculate metrics by dividing with reported completed operations */
    /**
     * @note Followers need to __atomically__ increment counters, which can impact
     * bandwidth statistics.
     */
    const std::chrono::seconds test_duration{bench_duration_sec};
    BOOST_LOG_TRIVIAL(info) << "Test will run for " << test_duration.count() << "s";
    const auto end_at = start_at + test_duration;
    {
        const int64_t start_ts =
            std::chrono::duration_cast<std::chrono::microseconds>(
                start_at.time_since_epoch())
            .count();
        const int64_t end_ts =
            std::chrono::duration_cast<std::chrono::microseconds>(
                end_at.time_since_epoch())
            .count();
        if (int r = coord_client.put("start_at", &start_ts, sizeof(start_ts)); r) {
            errno = -r;
            boost_log_errno_throw(coord_client.put start_at);
        }
        if (int r = coord_client.put("end_at", &end_ts, sizeof(end_ts)); r) {
            errno = -r;
            boost_log_errno_throw(coord_client.put end_at);
        }
        BOOST_LOG_TRIVIAL(info) << "injected start_ts " << start_ts
            << ", end_ts " << end_ts;
    }

    while (std::chrono::system_clock::now() < end_at)
        std::this_thread::sleep_for(1s);
    BOOST_LOG_TRIVIAL(info) << "Test duration passed, we should stop";

    return EXIT_SUCCESS;
}
